package com.dsp.stock.handle;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.text.DecimalFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.commons.io.FileUtils;
import org.apache.commons.io.LineIterator;
import org.apache.log4j.Logger;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline;
import cn.com.bo.util.crypto.MD5;
import cn.com.bo.util.redis.JedisUtil;
import cn.com.bo.util.redis.SerializeUtil;

import com.dsp.stock.conf.Config;
import com.dsp.stock.model.Log;

public class HandleReqLog extends BaseHandle {
	
	private Logger log = Logger.getLogger(HandleReqLog.class);
	
	private JedisUtil jedis = JedisUtil.getInstance();
	
	private Map<String,Log> map = new ConcurrentHashMap<String,Log>();
	
	private Map<String,Set<String>> sizeMap = new ConcurrentHashMap<String,Set<String>>();
	
	private Map<String,Set<String>> areaMap = new ConcurrentHashMap<String,Set<String>>();
	
	private Map<String,Set<String>> exchangeMap = new ConcurrentHashMap<String,Set<String>>();
	
	private Map<String,Set<String>> show_typeMap = new ConcurrentHashMap<String,Set<String>>();
	
	private Map<String,Set<String>> hourMap = new ConcurrentHashMap<String,Set<String>>();
	
	private Map<String,Set<String>> mediaMap = new ConcurrentHashMap<String,Set<String>>();
	
	private Map<String,Set<String>> spaceMap = new ConcurrentHashMap<String,Set<String>>();
	
	private Map<String,Set<String>> catMap = new ConcurrentHashMap<String,Set<String>>();
	
	private final int size = 200;
	
	
	public void parseReqLog(final String table){
		File[] reqPaths = reqPath();
		// 创建线程池
		ThreadPoolExecutor pool = new ThreadPoolExecutor(100, 100,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>(10),new ThreadPoolExecutor.CallerRunsPolicy());
		for (File req : reqPaths) {
			try {
		        int bufSize = 1048576;// 一次读取的字节长度
				FileChannel fileChannel = new RandomAccessFile(req,"r").getChannel(); 
				ByteBuffer rBuffer = ByteBuffer.allocate(bufSize); // 读取设置长度的字符
				//使用temp字节数组用于存储不完整的行的内容  
		        byte[] temp = new byte[0];  
		        byte[] bs = new byte[0];  
		        while(fileChannel.read(rBuffer) != -1) { 
		        	long start = System.currentTimeMillis();
		        	// position读取结束后的位置，相当于读取的长度
		        	bs = new byte[rBuffer.position()]; //用来存放读取的内容的数组
		        	rBuffer.rewind(); //将position设回0,所以你可以重读Buffer中的所有数据,此处如果不设置,无法使用下面的get方法  
		        	rBuffer.get(bs); //相当于rBuffer.get(bs,0,bs.length())：从position初始位置开始相对读,读bs.length个byte,并写入bs[0]到bs[bs.length-1]的区域 
		        	rBuffer.clear(); //这块的内容已经放入到bs，rbuffer清空
//		        	// 开始处理已经读取到的内容
		        	int startNum = 0;  
	                int LF = 10;//换行符  
	              //判断是否出现了换行符，注意这要区分LF-\n,CR-\r,CRLF-\r\n,这里判断\n 
	                boolean hasLF = false;//是否有换行符
	                for(int i=0;i < bs.length;i++) {  
	                    if(bs[i] == LF) {  
	                    	hasLF = true;  
	                        int lineNum = i - startNum;
	                        byte[] lineByte = new byte[temp.length+lineNum];  
		                    System.arraycopy(temp,0,lineByte,0,temp.length);  
		                    temp = new byte[0];  
		                    System.arraycopy(bs,startNum,lineByte,temp.length,lineNum);  
//		                    parseLine(/**redis,*/ new String(lineByte), table);
		                    final String line = new String(lineByte);
		                    lineByte = new byte[0];
		                    try {
		                    	pool.execute(new Runnable() {
									public void run() {
										parseLine(line, table);
										System.gc();
									}
								});
		                    	System.out.println("线程池中线程数目："+pool.getPoolSize()+"，队列中等待执行的任务数目："+
		                    			pool.getQueue().size()+"，已执行玩别的任务数目："+pool.getCompletedTaskCount());
							} catch (Exception e) {
								e.printStackTrace();
							}
		                    
		                    startNum = i;
	                    }  
	                } 
	                if (hasLF) {
	                    //将换行符之后的内容(去除换行符)存到temp中  
	                    temp = new byte[bs.length-startNum-1];  
	                    System.arraycopy(bs,startNum+1,temp,0,bs.length-startNum-1); 
					} else {  
		                //如果没出现换行符，则将内容保存到temp中  
		                byte[] toTemp = new byte[temp.length + bs.length];  
		                System.arraycopy(temp, 0, toTemp, 0, temp.length);  
		                System.arraycopy(bs, 0, toTemp, temp.length, bs.length);  
		                temp = toTemp;  
		            }  
	                long end = System.currentTimeMillis();
	        		System.out.println("读取该段耗时 ："+(end - start)+"ms");
	        		System.gc();
	        		displayAvailableMemory();
	                
		        }
		        fileChannel.close();
			} catch (IOException e) {
				log.error(req+"，文件解析失败："+e.getMessage());
				continue;
			}
		}
		// 关闭线程池
		pool.shutdown();
		
		log.info("解析请求日志结束！");
	}
	
	
	
	public void parseLine(String line, String table){
		long start = System.currentTimeMillis();
		try {
			long start1 = System.currentTimeMillis();
			Log log = converReq(line.split(Config.REQ_SEPARTOR));
			line = null;
			if (log == null) {
				return ;
			}
			String key = MD5.compute(log.toString());
			Log oldLog = map.get(key);
			if(oldLog != null){
				log.setPv(oldLog.getPv()+1);
			} else {
				log.setPv(1);
			}
			map.put(key, log);
			long end1 = System.currentTimeMillis();
			System.out.println("存map前流程处理时间 ："+(end1 - start1)+"ms");
			setMap(log, key);
			System.out.println("map的大小:"+map.size());
			System.out.println("areaMap的大小:"+areaMap.size());
			
			// 初步判断,避免影响性能不设置锁
			if (map.size() > size) {
				// 条件成立获取锁
				synchronized (this) {
					System.out.println(Thread.currentThread().getName()+"获得当前锁");
					// 为避免获取锁之后其余线程已执行完毕再次判断
					if (map.size() > size) {
						// 保存开始
						saveMap(table);
					}
				}
			}
			
		} catch (Exception e) {
			e.printStackTrace();
		}
		long end = System.currentTimeMillis();
		System.out.println("redis完整一次处理耗时 ："+(end - start)+"ms");
		
	}
	
	private void setMap(Log log, String key){
		long start = System.currentTimeMillis();
		if (areaMap.get(Config.AREA_ + log.getArea()) == null) {
			Set<String> set = new HashSet<String> ();
			set.add(key);
			areaMap.put(Config.AREA_ + log.getArea(), set);
		} else {
			areaMap.get(Config.AREA_ + log.getArea()).add(key);
		}
		
		if (catMap.get(Config.CAT_ + log.getCat_id()) == null) {
			Set<String> set = new HashSet<String> ();
			set.add(key);
			catMap.put(Config.CAT_ + log.getCat_id(), set);
		} else {
			catMap.get(Config.CAT_ + log.getCat_id()).add(key);
		}
		
		if (exchangeMap.get(Config.EXCHANGE_ + log.getExchange_id()) == null) {
			Set<String> set = new HashSet<String> ();
			set.add(key);
			exchangeMap.put(Config.EXCHANGE_ + log.getExchange_id(), set);
		} else {
			exchangeMap.get(Config.EXCHANGE_ + log.getExchange_id()).add(key);
		}
		
		if (hourMap.get(Config.HOUR_ + log.getTime()) == null) {
			Set<String> set = new HashSet<String> ();
			set.add(key);
			hourMap.put(Config.HOUR_ + log.getTime(), set);
		} else {
			hourMap.get(Config.HOUR_ + log.getTime()).add(key);
		}
		
		if (mediaMap.get(Config.MEDIA_ + log.getMedia_id()) == null) {
			Set<String> set = new HashSet<String> ();
			set.add(key);
			mediaMap.put(Config.MEDIA_ + log.getMedia_id(), set);
		} else {
			mediaMap.get(Config.MEDIA_ + log.getMedia_id()).add(key);
		}
		
		if (show_typeMap.get(Config.SHOW_TYPE_ + log.getShow_type()) == null) {
			Set<String> set = new HashSet<String> ();
			set.add(key);
			show_typeMap.put(Config.SHOW_TYPE_ + log.getShow_type(), set);
		} else {
			show_typeMap.get(Config.SHOW_TYPE_ + log.getShow_type()).add(key);
		}
		
		if (sizeMap.get(Config.SIZE_ + log.getSize()) == null) {
			Set<String> set = new HashSet<String> ();
			set.add(key);
			sizeMap.put(Config.SIZE_ + log.getSize(), set);
		} else {
			sizeMap.get(Config.SIZE_ + log.getSize()).add(key);
		}
		
		if (spaceMap.get(Config.SPACE_ + log.getSpace_id()) == null) {
			Set<String> set = new HashSet<String> ();
			set.add(key);
			spaceMap.put(Config.SPACE_ + log.getSpace_id(), set);
		} else {
			spaceMap.get(Config.SPACE_ + log.getSpace_id()).add(key);
		}
		long end = System.currentTimeMillis();
		System.out.println("setMap一次耗时 ："+(end - start)+"ms");
	}
	
	private void saveMap(String table){
		synchronized (map) {
			if (map.size() > size) {
				long start = System.currentTimeMillis();
				System.out.println("map条数>"+size+"，触发保存操作");
				Jedis redis = jedis.getJedis();
				try {
					
					for (String k : map.keySet()) {
						Log l = map.get(k);
						if(redis.hexists(table.getBytes(),k.getBytes())){
							Log log = (Log)SerializeUtil.unserialize(redis.hget(table.getBytes(), k.getBytes()));
							l.setPv(log.getPv()+l.getPv());
						}
						redis.hset(table.getBytes(),k.getBytes(),SerializeUtil.serialize(l));
				    }
					Pipeline p1 = redis.pipelined();
					for (String k : areaMap.keySet()) {
						p1.sadd(k,areaMap.get(k).toArray(new String[]{}));
				    }
					
					for (String k : catMap.keySet()) {
						p1.sadd(k,catMap.get(k).toArray(new String[]{}));
				    }
					
					for (String k : sizeMap.keySet()) {
						p1.sadd(k,sizeMap.get(k).toArray(new String[]{}));
				    }
					
					for (String k : hourMap.keySet()) {
						p1.sadd(k,hourMap.get(k).toArray(new String[]{}));
				    }
					
					for (String k : show_typeMap.keySet()) {
						p1.sadd(k,show_typeMap.get(k).toArray(new String[]{}));
				    }
					
					for (String k : mediaMap.keySet()) {
						p1.sadd(k,mediaMap.get(k).toArray(new String[]{}));
				    }
					
					for (String k : spaceMap.keySet()) {
						p1.sadd(k,spaceMap.get(k).toArray(new String[]{}));
				    }
					
					for (String k : exchangeMap.keySet()) {
						p1.sadd(k,exchangeMap.get(k).toArray(new String[]{}));
				    }
//					for (String k : map.keySet()) {
//						Log l = map.get(k);
//						if(redis.hexists(table.getBytes(),k.getBytes())){
//							Log log = (Log)SerializeUtil.unserialize(redis.hget(table.getBytes(), k.getBytes()));
//							l.setPv(log.getPv()+l.getPv());
//						}
//						redis.hset(table.getBytes(),k.getBytes(),SerializeUtil.serialize(l));
//				    }
//					for (String k : areaMap.keySet()) {
//						redis.sadd(k,areaMap.get(k).toArray(new String[]{}));
//				    }
//					
//					for (String k : catMap.keySet()) {
//						redis.sadd(k,catMap.get(k).toArray(new String[]{}));
//				    }
//					
//					for (String k : sizeMap.keySet()) {
//						redis.sadd(k,sizeMap.get(k).toArray(new String[]{}));
//				    }
//					
//					for (String k : hourMap.keySet()) {
//						redis.sadd(k,hourMap.get(k).toArray(new String[]{}));
//				    }
//					
//					for (String k : show_typeMap.keySet()) {
//						redis.sadd(k,show_typeMap.get(k).toArray(new String[]{}));
//				    }
//					
//					for (String k : mediaMap.keySet()) {
//						redis.sadd(k,mediaMap.get(k).toArray(new String[]{}));
//				    }
//					
//					for (String k : spaceMap.keySet()) {
//						redis.sadd(k,spaceMap.get(k).toArray(new String[]{}));
//				    }
//					
//					for (String k : exchangeMap.keySet()) {
//						redis.sadd(k,exchangeMap.get(k).toArray(new String[]{}));
//				    }
					exchangeMap.clear();
					map.clear();
					areaMap.clear();
					spaceMap.clear();
					mediaMap.clear();
					show_typeMap.clear();
					hourMap.clear();
					sizeMap.clear();
					catMap.clear();
					System.out.println("redis保存完毕！");
				} catch (Exception e) {
					e.printStackTrace();
				} finally {
					jedis.returnJedis(redis);
				}
				
				long end = System.currentTimeMillis();
				System.out.println("redis完整一次处理耗时 ："+(end - start)+"ms");
			}
			
		}
		
	} 
	
	
	public static void minBufReadByNio(String filePath){
		try {
			int bufSize = 10;// 一次读取的字节长度,下边的处理方法必须设置小着读，要保证一次读取没有两行的情况
			FileChannel fileChannel = new RandomAccessFile(filePath,"r").getChannel(); 
			ByteBuffer rBuffer = ByteBuffer.allocate(bufSize); // 读取设置长度的字符
			//使用temp字节数组用于存储不完整的行的内容  
	        byte[] temp = new byte[0];  
	        while(fileChannel.read(rBuffer) != -1) { 
	        	// position读取结束后的位置，相当于读取的长度
	        	byte[] bs = new byte[rBuffer.position()]; //用来存放读取的内容的数组
	        	rBuffer.rewind(); //将position设回0,所以你可以重读Buffer中的所有数据,此处如果不设置,无法使用下面的get方法  
	        	rBuffer.get(bs); //相当于rBuffer.get(bs,0,bs.length())：从position初始位置开始相对读,读bs.length个byte,并写入bs[0]到bs[bs.length-1]的区域 
	        	rBuffer.clear(); //这块的内容已经放入到bs，rbuffer清空
	        	
	        	// 开始处理已经读取到的内容
	        	int startNum = 0;  
	            int LF = 10;//换行符  
	            //判断是否出现了换行符，注意这要区分LF-\n,CR-\r,CRLF-\r\n,这里判断\n 
	            boolean hasLF = false;//是否有换行符
	            for(int i=0;i < bs.length;i++) {  
	                if(bs[i] == LF) {  
	                	hasLF = true;  
	                    startNum = i;  
	                }  
	            } 
	            if (hasLF) {
	            	//如果出现了换行符，将temp中的内容与换行符之前的内容拼接  
	                byte[] toTemp = new byte[temp.length+startNum];  
	                System.arraycopy(temp,0,toTemp,0,temp.length);  
	                System.arraycopy(bs,0,toTemp,temp.length,startNum);  
//	                System.out.println(new String(toTemp));  
//	                System.out.println("");
	                //将换行符之后的内容(去除换行符)存到temp中  
	                temp = new byte[bs.length-startNum-1];  
	                System.arraycopy(bs,startNum+1,temp,0,bs.length-startNum-1); 
				} else {  
	                //如果没出现换行符，则将内容保存到temp中  
	                byte[] toTemp = new byte[temp.length + bs.length];  
	                System.arraycopy(temp, 0, toTemp, 0, temp.length);  
	                System.arraycopy(bs, 0, toTemp, temp.length, bs.length);  
	                temp = toTemp;  
	            }  
	            
	        }
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
	
	public static void maxBufReadByNio(String filePath){
		 try {
	        	int bufSize = 10000000;// 一次读取的字节长度
				FileChannel fileChannel = new RandomAccessFile(filePath,"r").getChannel(); 
				ByteBuffer rBuffer = ByteBuffer.allocate(bufSize); // 读取设置长度的字符
				//使用temp字节数组用于存储不完整的行的内容  
		        byte[] temp = new byte[0];  
		        while(fileChannel.read(rBuffer) != -1) { 
		        	// position读取结束后的位置，相当于读取的长度
		        	byte[] bs = new byte[rBuffer.position()]; //用来存放读取的内容的数组
		        	rBuffer.rewind(); //将position设回0,所以你可以重读Buffer中的所有数据,此处如果不设置,无法使用下面的get方法  
		        	rBuffer.get(bs); //相当于rBuffer.get(bs,0,bs.length())：从position初始位置开始相对读,读bs.length个byte,并写入bs[0]到bs[bs.length-1]的区域 
		        	rBuffer.clear(); //这块的内容已经放入到bs，rbuffer清空
		        	
		        	// 开始处理已经读取到的内容
		        	int startNum = 0;  
	                int LF = 10;//换行符  
	              //判断是否出现了换行符，注意这要区分LF-\n,CR-\r,CRLF-\r\n,这里判断\n 
	                boolean hasLF = false;//是否有换行符
	                for(int i=0;i < bs.length;i++) {  
	                    if(bs[i] == LF) {  
	                    	hasLF = true;  
	                        int lineNum = i - startNum;
	                        byte[] lineByte = new byte[temp.length+lineNum];  
		                    System.arraycopy(temp,0,lineByte,0,temp.length);  
		                    temp = new byte[0];  
		                    System.arraycopy(bs,startNum,lineByte,temp.length,lineNum);  
		                    String line = new String(lineByte);
		                    System.out.println(line.replace(Config.REQ_SEPARTOR, ",")); 
		                    startNum = i;
	                    }  
	                } 
	                if (hasLF) {
	                    //将换行符之后的内容(去除换行符)存到temp中  
	                    temp = new byte[bs.length-startNum-1];  
	                    System.arraycopy(bs,startNum+1,temp,0,bs.length-startNum-1); 
					} else {  
		                //如果没出现换行符，则将内容保存到temp中  
		                byte[] toTemp = new byte[temp.length + bs.length];  
		                System.arraycopy(temp, 0, toTemp, 0, temp.length);  
		                System.arraycopy(bs, 0, toTemp, temp.length, bs.length);  
		                temp = toTemp;  
		            }  
		        }
			} catch (Exception e) {
				e.printStackTrace();
			}
	}
	
	public static void displayAvailableMemory() {  
        DecimalFormat df = new DecimalFormat("0.00");  
  
        //显示JVM总内存  
        long totalMem = Runtime.getRuntime().totalMemory();  
        System.out.println("JVM总內存："+df.format(totalMem/1000000F) + " MB");  
        //显示JVM尝试使用的最大内存  
        long maxMem = Runtime.getRuntime().maxMemory();  
        System.out.println("JVM尝试使用的最大内存 ："+df.format(maxMem/1000000F) + " MB");  
        //空闲内存  
        long freeMem = Runtime.getRuntime().freeMemory();  
        System.out.println("空闲内存："+df.format(freeMem/1000000F) + " MB");  
    }  
	
	
	public static void main(String[] args) {
		// long start = System.currentTimeMillis();
		// maxBufReadByNio("E://gk/1496211930645-17386");
		// long end = System.currentTimeMillis();
		// System.out.println("max 耗时 ："+(end - start)+"ms");
		// start = System.currentTimeMillis();
		// minBufReadByNio("E://gk/1496211930645-17386");
		// end = System.currentTimeMillis();
		// System.out.println("min 耗时 ："+(end - start)+"ms");
		// final List<String> list = new ArrayList<String>();
		// list.add("a");
		// System.out.println(list.size());
//		for (int i = 0; i < 10000; i++) {
//			
//			StringBuffer sb = new StringBuffer();
//			for (int j = 0; j < 10000; j++) {
//				sb.append("2b");
//			}
//			final byte[] lineByte = sb.toString().getBytes();
//			System.gc();
//			try {
//				Thread.sleep(2000);
//			} catch (InterruptedException e) {
//				// TODO Auto-generated catch block
//				e.printStackTrace();
//			}
//			displayAvailableMemory();
//		}
		maxBufReadByNio("C:/Users/bosha/Desktop/part-r-00000");
	}
	
	
//	String areaList = Config.AREA_ + log.getArea();
//	p1.sadd(areaList, key);
//	String exchangeList = Config.EXCHANGE_ + log.getExchange_id();
//	p1.sadd(exchangeList, key);
//	String sizeList = Config.SIZE_ + log.getSize();
//	p1.sadd(sizeList, key);
//	String hourList = Config.HOUR_ + log.getTime();
//	p1.sadd(hourList, key);
//	String show_typeList = Config.SHOW_TYPE_ + log.getShow_type();
//	p1.sadd(show_typeList, key);
//	String mediaList = Config.MEDIA_ + log.getMedia_id();
//	p1.sadd(mediaList, key);
//	String spaceList = Config.SPACE_ + log.getSpace_id();
//	p1.sadd(spaceList, key);
//	String catList = Config.CAT_ + log.getCat_id();
//	p1.sadd(catList, key);
}
